Query evaluator design
信頼性に関するストーリーを決定した後、我々はクエリ評価エンジンのプロトタイピングを開始しました。BartとBrianによる最初のプロトタイプの試みは、Orleansをベースに、スタンディングクエリとストリーム(実際には単なるサブジェクト)をアクターとしてモデル化し、実行を個々のクエリ演算子にまで分解して、そのような演算子をすべてグレインとして表現することで、サブコンピューティングを最大限に再利用できるようにしました。当初は、デフォルトで「共通部分式除去」(CSE)を行うことを考えていましたが、非常にコストがかかることが判明しました。さらに、当時のOrleansには信頼性の高い「仮想ストリーム」機能が存在しなかったため、信頼性の高いRxの抽象化を下層の実行ファブリックにマッピングすることが困難でした。
After deciding on a reliability story, we started prototyping the query evaluator engine. An initial prototype attempt by Bart and Brian was based on Orleans, modeling standing queries and streams (really just subjects) as actors, decomposing execution all the way to individual query operators in order to enable maximum reuse of subcomputations, by representing all such operators as grains. The initial thinking here was to get to “common subexpression elimination” (CSE) by default, but it proved to be prohibitively expensive. In addition, no reliable “virtual streams” capability existed in Orleans at that time, making it hard to map our reliable Rx abstraction onto the underlying execution fabric.
その後、ReaqtorチームとOrleansチームの会話により、IRPと似たようなインターフェースを持つOrleansのバーチャルストリームが導入されました。
Conversations between the Reaqtor and Orleans teams at a later point led to the introduction of virtual streams in Orleans, which have interfaces that share some similarities with IRP.
最終的に、私たちは振り出しに戻り、Bart氏は、Bart氏とWes氏がCloud Programmability Team時代に構築したRx-as-a-Serviceのデモをベースに、前述のIRPの抽象化を用いて、中核となるクエリエンジンの実装を開始しました。最初から次のような設計原則が提示されました。
Ultimately, we went back to the drawing board and Bart started an implementation of the core query engine roughly based on the demo for Rx-as-a-service which Bart and Wes built back in the Cloud Programmability Team days, using the IRP abstractions described earlier. The following design principles were put forward from the get go:
予測可能なパフォーマンスと計算の再現性を確保するために、イベント処理のコードパス上のI/Oを避けます。
Avoid I/O on the event processing code path for predictible performance and to ensure repeatability of the computation.
コアとなるクエリ演算子の同期実行により、最小の同期要件で最大のスループットを実現します。
Synchronous execution of core query operators for maximum throughput, with minimal synchronization requirements.
クエリ演算子ライブラリとホスト環境への依存性を最小限に抑えることができます。
Minimal dependencies of the query operator library and the hosting environment.
過剰なコンテキストスイッチを避けるためにスレッドを厳密に制御しています。
Tightly controlled threading to avoid excessive context switching.
オブジェクトのレイアウトを最適化することで、スタンディングクエリの高密度化を実現します。
Achieve high density of standing queries by optimizing object layouts.
オペレータの状態をチェックポイントするための最小のオーバーヘッド。
Minimal overhead for checkpointing of operator state.
コアとなるクエリエンジンの設計と実装には、Tiho Tarnavsi(以前はStreamInsightに携わっていた)、ミュンヘンのMSRチームの様々な人々、そしてEric Rozellも貢献しています。この地理的に分散した仮想チームは、クエリエンジンの設計の最終的な詳細を詰めていき、2013年末に最初のバージョンを完成させました。
Other contributors to the design and implementation of the core query engine include Tiho Tarnavsi (who previously worked on StreamInsight), various folks from an MSR team in Munich, and Eric Rozell. This virtual geodistributed team ironed out the final details of the query engine design and we finished the first version of it in late 2013.
まず、異なる信頼性モデルを評価するために、2種類のエンジンを実装しました。1つは信頼性を実現するためにアクティブ/アクティブ戦略を用い、もう1つはチェックポイント/リプレイ戦略を用いました。ミュンヘンのチームは、様々なワークロードに対して両アプローチを対比・比較するために、様々な評価を行いました。その結果、メモリ使用量、イベントの複製、復旧時間の3つが重要なトレードオフとなりました。最終的に、私たちは信頼性を IQueryEngine インターフェースに抽象化し、高密度を実現するための最も有望なルートとして CheckpointingQueryEngine の実装を進めました(ワーキングセットが原動力となりました)。今日まで、ほとんどのReaqtorのデプロイメントは、このチェックポイント/リプレイエンジンで実行されていますが、可用性の高い低レイテンシーのワークロードは、アクティブ/アクティブなクエリエンジンで簡単にホストすることができます。
First, we implemented two variants of the engine to evaluate different reliability models. One used an active/active strategy to achieve reliability, while another one used a checkpoint/replay strategy. The Munich team did perform various evaluations to contrast and compare both approaches for various workloads. Not surprisingly, the key trade-offs are between memory usage, replication of events, and recovery time. Ultimately, we decided to abstract out reliability to land our IQueryEngine interface, and proceeded to implement the CheckpointingQueryEngine as the most promising route to achieve high density (with working set being the driving factor), leaving the possibility to add other implementations for reliability at a future point. To this day, most Reaqtor deployments are running with this checkpoint/replay engine, but highly available low latency workloads could be hosted in an active/active query engine quite easily.
実際、Reaqtorエンジンのあるホスティング環境は、デザインスペースの非常に興味深いコーナーで使用されており、何十億もの静止したクエリがあり、それらのクエリの99.9%は1日に数回のイベントしか処理していません。この環境では、チェックポインティングクエリエンジンを採用しており、イベントを受信し、そのイベントの対象となるクエリ式を含むクエリエンジンを回復させ、イベントを取り込み、チェックポインティングとアンロードによってクエリエンジンを再び休止状態にします。これは事実上、イベントベースの起動メカニズムを持つマイクロコンピュータのページングシステムです。
In fact, one hosting environment for the Reaqtor engine has been used in a quite interesting corner of the design space, with billions of standing queries with 99.9% of those queries only handling a few events per day. This deployment employs the checkpointing query engine by receiving an event, recovering the query engine(s) containing query expressions for which the event is destined, ingesting the event, and then putting the query engine(s) in a dormant state again by checkpointing and unloading. This is effectively a paging system for micro-computations with an event-based activation mechanism.
次に、スケジューラを物理的/論理的に重ねることにしました。RxにはISchedulerインターフェースがあり、様々な並行性のソースに対応し、時間の概念を抽象化しています。Reaqtorのコンテキストでは、スケジューラ上の個々のクエリ演算子のパラメータ化はほとんど価値がありません。なぜなら、スケジューラをホスティング環境にバインドしたいからです。さらに、故障の単位を物理的なマシンサイズから切り離して密度を高めるために、同じプロセス内で多くのクエリエンジンをホストできるようにしたいと考えていました。現在、Reaqtorで構築されたサービスの実稼働環境では、1プロセスあたり数十個のクエリエンジンインスタンスが稼働しており、それぞれが数万件のスタンディングクエリをホストしています。このような各クエリエンジンは、きめ細かなフェイルオーバーのために、Service Fabricの信頼性の高いステートフルサービスに1対1でマッピングされています。このため、同一プロセス内で物理スレッドの固定プールを共有する論理スケジューラが導入されました。各クエリエンジンには、独立したライフタイムを持つ独自の論理スケジューラがあり、チェックポイントを容易にするためにポーズ/リジュームをサポートしています。物理スケジューラは、その子の論理スケジューラから作業を盗むことを実行します。
Second, we decided on a physical/logical layering of schedulers. Rx has an IScheduler interface to adapt to various sources of concurrency and to abstract over the notion of time. In the context of Reaqtor, parameterization of individual query operators on schedulers has little value, because we want to bind the scheduler to the hosting environment. In addition, we wanted to be able to host many query engines within the same process, to achieve density while making the unit of failure decoupled from the physical machine size. Today, production deployments of services built on Reaqtor run tens of query engine instances per process, each of them hosting tens of thousands of standing queries. Each such query engine is mapped 1-to-1 to a reliable stateful service in Service Fabric for fine-grained failover. This led to the introduction of logical scheduler which share a fixed pool of physical threads within the same process. Each query engine has its own logical scheduler which has an independent lifetime and supports pause/resume to facilitate checkpointing. The physical scheduler performs work stealing from its child logical schedulers.
論理スケジューラで一時停止/再開をサポートするという選択は、さまざまなチェックポイントオプションを評価したことがきっかけでした。StreamInsight時代に使用したそのようなオプションの1つは、データフローパスにチェックポイントマーカーを流すことでした。様々なプロトタイプでは、Rx代数が提供するダイナミズム、特に高次のクエリ演算子を考慮すると、これは非常に厄介であり、チェックポイントの実行にかかる時間の上限を提供することは困難であることがわかりました。さらに、I/Oを削減するための差分チェックポイントを容易にするために、問い合わせ演算子がダーティであるかどうかを検出することも困難です。これらの発見に基づき、私たちはStop-the-Worldチェックポイントのアプローチを決定しましたが、一時停止の時間を最小限に抑えるために、様々な設計ポイントやメカニズムを導入しました。
The choice to support pause/resume on logical schedulers was motivated by an evaluation of different checkpointing options. One such option we used in the StreamInsight days is to flow a checkpoint marker through the data flow path. Various prototypes showed that this gets quite tricky given the dynamism provided by the Rx algebra, in particular higher order query operators, and it’s hard to provide an upper bound to the time taken to perform a checkpoint. Moreover, detecting whether query operators are dirty, to facilitate differential checkpointing to reduce I/O, gets tricky as well. Based on these findings, we decided on a stop-the-world checkpoint approach, but with various design points and mechanisms in place to reduce the pause time to a minimum:
クエリ演算子にトライステートフラグを導入することで、ステートのスナップショット中のイベント処理のみを一時停止し、恒久的なストレージへの書き込み時間を含まない(I/O拘束)。
Only pausing event processing during the snapshotting of state, thus not including the time to write to permanent storage (I/O bound), by introducing a tri-state flag on query operators.
不変的なデータ(例:式木やメタデータの形でのスタンディングクエリの定義)と、可変的なデータ(例:演算子の状態)をきれいに分離する。
Clean separation of immutable data (e.g. definition of standing queries in the form of expression trees and metadata) from mutable data (e.g. operator state).
高次の演算子を含むような高度に動的なクエリ式に対して、クエリをより小さな断片に分解することで、状態を分割します。
Partitioning of state for highly dynamic query expressions, e.g. involving higher-order operators, by decomposing queries into smaller pieces.
パーシステンスが必要なダーティな演算子を検出するための軽量なメカニズムを備えた差分チェックポイントのサポート。
Support for differential checkpoints with a lightweight mechanism to detect dirty operators that need persistence.
エンジンの設計作業は、ホストされているオペレーターライブラリと一体となって行われ、両者の間の最小限のインターフェースを確保しました。最終的には、新たに構築したオペレータライブラリをOSS版のRxに収束させることを目標とし、この人気の高いライブラリをホステッドサービスで動作させるための改良を加え、信頼性のメカニズムなどを提供しました。これにより、ISubscribable<T>の形式化が行われ、演算子ツリーをトラバースするためのビジターパターンのサポートが追加されました。従来のRxでは、IDisposableインターフェイスは、ノードのトラバーサルパターンを使用してスタンディングイベントプロセッシングクエリを解体するために使用されます。ISubscribable<T>では、ISubscriptionVisitorインターフェイスを使用して、(動的な)演算子ツリーの他のトラバースをサポートしており、さまざまな使用が可能です。
The design work on the engine was done hand-in-handy with the hosted operator libraries to ensure a minimal interface between both. Ultimately, our goal was to converge the newly built operator library with the OSS version of Rx, merely adding enlightenments for running this popular library in a hosted service, offering reliability mechanisms and more. This led to the formalization of ISubscribable<T> where we added support for a visitor pattern to traverse an operator tree. In classic Rx, the IDisposable interface is used to tear down a standing event processing query using some traversal pattern of the nodes. With ISubscribable<T>, we support other traversals of the (dynamic) operator tree using an ISubscriptionVisitor interface, enabling a variety of uses:
ホストのスケジューラやログ機能などのコンテキストを提供し、スタンディングクエリを初期化します。
Initialize a standing query by providing it context such as the host’s scheduler, logging facilities, etc.
ツリー内のリーフオブザーバブルノードにシグナルを送信することで、スタンディングクエリの処理を開始します(事実上、イベントの「蛇口」を開きます)。
Start processing a standing query by sending a signal to the leaf observable nodes in the tree (effectively opening the “tap” of events).
演算子ツリーをトラバースし、ダーティフラグをチェックすることで、最後にチェックポイントが成功してからノードの状態が変化したかどうかを検出します。
Traverse the operator tree to detect whether any node has state changes since the last successful checkpoint by checking dirty flags.
演算子の状態をチェックポイントするために、ステートライターを各ノードに提供し、明確に定義されたトラバーサル順序で実行します。
Perform a checkpoint of operator state by providing a state writer to each node in a well-defined traversal order.
逆に、ステートリーダーを各ノードにうまく定義されたトラバーサル順序で提供することで、オペレータの状態を復元します。
Conversely, restore operator state by providing a state reader to each node in a well-defined traversal order.
IDisposableに似た、リソースの解放を可能にする、待機中のクエリのグレースフルなアンロード。
Graceful unloading of standing queries to allow releasing of resources, akin to IDisposable.
例えば、ソースやシンクの参照カウントをデクリメントすることで、継続的なクエリを終了させることができます。
Termination of a standing query, for example by decrementing reference count on sources and sinks.
従来のRxと比較して、ISubscribable<T>インターフェイスは、様々な操作を実行するための(動的に進化する)クエリ演算子ツリーへのアクセスを可能にするISubscriptionを返すことで、スタンディングクエリのライフサイクルに、よりきめ細かい段階を追加します。サブスクライバー(IObserver<T>)をイベントのソースと関連付けるためにSubscribeを使用しただけでは、計算は開始されず、むしろ最初の演算子ツリーのファクトリとして機能します。このパターンは、クライアントアプリケーションでも使用できます。例えば、アプリケーションの状態をディスクに永続化することで、アプリケーションの破棄をサポートしたり、同じスケジューラをすべてのオペレータノードに配布して、手動でパラメータ化する必要をなくしたりします。
Compared to classic Rx, the ISubscribable<T> interface adds more granular phasing to the lifecycle of standing queries, by returning an ISubscription which gives access to the (dynamically evolving) query operator tree to perform a variety of operations. The mere use of Subscribe to associate a subscriber (in the form of an IObserver<T>) with a source of events does not kick off the computation but rather acts as the factory of an initial operator tree, with subsequent operations taking care of providing additional context, restoring state from a previous checkpoint, starting the event flow, etc. This pattern is also useful in client applications, for example to support tombstoning of an application by persisting its state to disk, or to distribute the same scheduler to all operator nodes rather than manually parameterizing it everywhere.
クエリエンジンの他の側面としては、イベント処理クエリを表す式木を受け入れ、エンジン内でホストされているアーティファクト(いわゆる「レジストリ」または「カタログ」)にクエリを行うためのIRPインターフェースの実装や、バインドされたアーティファクトの動的検索を可能にするリゾルバのパラメータ化があり、エンジン間の通信によりノード間での計算のスケールアウトを可能にしています。
Other aspects of the query engine include its implementation of IRP interface to accept expression trees representing event processing queries and to query the artifacts hosted within an engine (the so-called “registry” or “catalog”), and the parameterization on resolvers which enable dynamic lookup of bound artifacts, allowing for cross-engine communication to scale out computation across nodes.
クエリエンジンの初期実装の途中で、Eric Rozell がコアフレームワークチームに加わり、式木のシリアル化、イベントオブジェクトをシリアル化するためのデータモデル、式木のバインダーなど、幅広い実装面で活躍しました。コアエンジンへの他の貢献者としては、Alex Clemmer(高次のクエリ演算子のサポートに従事)、Pranav Senthilnathan(トランザクションロギングと様々な形のクエリ最適化に従事)などがいます。
Halfway through the initial implementation of the query engine, Eric Rozell joined the core framework team, working on a wide range of implementation aspects including expression tree serialization, the data model to serialize event objects, expression tree binders, etc. Other contributors to the core engine include Alex Clemmer (who worked on higher-order query operator support) and Pranav Senthilnathan (who worked on transaction logging and various forms of query optimization).